/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/01/08 10:12
 * Description: 采用Structured Streaming进行数据采集
 * 放弃使用类映射的方式生成schema
 * 转为使用withColumn和select方式
 * 将一列数据分割成为多列
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.StructStreamingUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.io.IOException;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.split;

public class StructuredStreaming11 {

	public static void main(String[] args){

		SparkSession spark = SparkSession
				.builder()
				.appName("MyAutoMappingDataApp")
				.getOrCreate();

		Dataset<Row> kafkaDataset = spark
				.readStream()
				.format("kafka")
				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
				.option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
				.option("startingOffsets",ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
				.load();


		//进行数据的过滤，过滤分割后列的数量不满足要求的列
		kafkaDataset.filter((FilterFunction<Row>) x -> StructStreamingUtil.isStandardData(x.toString()));

		//将一列数据转换成多列数据
		Dataset<Row> rowDataset = kafkaDataset
				.select(col("value").cast("string"))
				.withColumn("tmp", split(col("value"), "[|]"))
				.select(ColumnsUtil.getInstance().getColumns("tmp"))
				.drop(col("tmp"));

		//将多列转换回单列
		Dataset<String> lineDataset = rowDataset.map((MapFunction<Row, String>) x -> {
			String str = x.toString();
			return str.substring(1,str.length()-1).replace(',','|');
		}, Encoders.STRING());

		//输出到控制台
		StreamingQuery query = lineDataset
				.writeStream()
				.format("console")
				.outputMode(OutputMode.Append())
				.start();

		//保持程序运行
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}
